Pairwise Alignment

Teo Sakel

10/01/2022

Why Sequence Similarity?

Read Mapping

Homology

Thomas Shafee, CC BY-SA 4.0, via Wikimedia Commons

Rost, Burkhard. “Twilight zone of protein sequence alignments.” Protein engineering (1999)

What is Sequence Similarity?

Distance Metrics

Edit Distance

  1. Hamming distance: only substitutions
  2. Longest common sub-sequence (LCS): only insertions/deletions
  3. Levenshtein distance: substitutions and insertions/deletions

Adapted from SCHMID, R. (2006). Computational Genome Analysis

Hamming Distance

def hamming(S1, S2):
    assert len(S1) == len(S2)
    return sum(x != y for x, y in zip(S1, S2))
Distance of "this" vs "that" = 2

Padded Hamming Distance

To extend the definition to strings of unequal length:

  • Define an extra “gap” character -
  • Pad the shortest sequence with gaps until it matches the length of the longer.
  • Count the number of differences between the extended sequences

Padded Hamming Distance

def hamming(S0, S1):
    L = len(S0), len(S1)
    if L[0] == L[1]:
        return sum(x != y for x, y in zip(S0, S1))
    if L[0] < L[1]:
        # reverse so that S0 is longer (distance is symmetric)
        S0, S1 = S1, S0
        padding = '-' * (L[1] - L[0])
    else:
        padding = '-' * (L[0] - L[1])
    
    pref = hamming(S0, padding + S1)  # pad at the begining
    suff = hamming(S0, S1 + padding)  # pad at the end
    dist = min(pref, suff) # shortest path from S0 -> S1
    return min(pref, suff)  
Distance of "BANANA" vs "BANA" = 2
 pref  vs  suff
BANANA    BANANA
--BANA    BANA--

Alignment

  • Given a sequence \(S\), an extended sequence \(S'\) is an arbitrary extension of \(S\) with gap symbols -.
  • A global alignment of \(S\) and \(Q\) is a 1-1 matching of their extended sequences \(S'\) and \(Q'\). The matching must be co-linear (no permutations of the letters is allowed) and have no pairs of gaps aligned to each other.
  • A local alignment is similar to a global one but not for the whole of \(S'\) and \(Q'\) just contiguous segments.

Alignment Example

Input

S = HEAGAWGHEEAHGEGAE
Q = PAWHEAEHE

Global Alignment

HEAGAWGHEEAHGEGAE
--P-AW-HE-A--EH-E

Local Alignment

WGHEEAHGE
AW-HEAEHE

Levenshtein = Hamming of Aligned Sequences

  • Substitutions : if both sequences have a non-gap symbol there
  • Insertions : if the \(S'\) sequence has a gap. \(Q\) has an extra symbols “inserted” at this position.
  • Deletions : if the \(Q'\) sequence has a gap. \(Q\) has a symbol “deleted” at this position
dHamm = hamming('MISSISSIPI', 'MISISSIPI')
dLeve = hamming('MISSISSIPI', 'MIS-ISSIPI')
Hamming = 3 | Levensthein = 1
------------|----------------
MISSISSIPI  |   MISSISSIPI
-MISISSIPI  |   MIS-ISSIPI

Weighted Distance

Amino Acids Nucleotides
Amino Acid by property transition_vs_transversion

Substitution Matrices

Weighted Hamming Distance

def hamming(S0, S1, M):
    # score given by matrix M
    L = len(S0), len(S1)
    if L[0] == L[1]:
        return sum(M[x, y] for x, y in zip(S0, S1)) 
    if L[0] < L[1]:
        S0, S1 = S1, S0
        padding = '-' * (L[1] - L[0])
    else:
        padding = '-' * (L[0] - L[1])
    
    pref = hamming(S0, padding + S1, M) 
    suff = hamming(S0, S1 + padding, M)
    return max(pref, suff)  # max instead of min
Distance of "PANAMA" vs "PAMA" based on BLOSUM62 = 5

Dot Plot

Definition

A visual representation of all possible local alignments.

Patterns

  • Repeats: tomorrow and tomorrow and tomorrow
  • Palidromes: can i see bees in a cave

Computation

def dot_matrix(X, M, Y=None):
    if Y is None:
        Y = X
    dot = np.zeros((len(X), len(Y)), dtype=int)
    for i, x in enumerate(X):
        for j, y in enumerate(Y):
            dot[i, j] = M[x, y]
    return dot

Example: P00738

Denoising Alignments

  1. Filtering (Threshold)
  2. Redundancy (k-mers)

Filtering

Trade-off variance (noise) for bias (away from 0)

Redundancy

\(k\)-mers are substrings of length \(k\) contained in a sequence

  • are more robust to noise (mutations)
  • carry more information (\(\sum_{a=1}^k -\log_2 p_a\))
  • can “vectorize” the text (dimension: \(|\Sigma|^k\))
def iter_words(text, k):
    L = len(text) - k + 1
    for i in range(L):
        yield text[i:i+k]
MIS
 ISS
  SSI
   SIS
    ISS
     SSI
      SIP
       IPP
        PPI
MISSISSIPPI

Denoised Dot Matrix

def dot_matrix(X, M, Y=None, k=1, T=0):
    if Y is None:
        Y = X
    dot = np.zeros((len(X), len(Y)), dtype=int)
    for i, x in enumerate(iter_words(X, k)):
        for j, y in enumerate(iter_words(Y, k)):
            dot[i, j] = M(x, y)
    dot[dot < T] = 0
    return dot

Denoised Dot Matrix

Ungapped Alignment - BLAST

Seed and Extend Strategy

With some extras…

  1. Index: dict of \(k\)-mers positions
  2. Seed: starting points
  3. Extend: ungapped alignment
  4. Xdrop: early stopping heuristic
  5. E-value: probability model for alignments
  6. Filtering: use threshold to remove noise

BLAST90

Adapted form Harris, R.S., (2007) “Improved pairwise alignment of genomic DNA”

BLAST90 Implementation

class BLAST90:
    def __init__(self, reference, score, k=3, T=13):
        self.score = score  # score matrix
        self.k = k  # length of word
        self.T = T  # Threshold
        self.store_reference(reference)
        self.scaling_const = self.score.calc_K_lambda(self.ref)
        self.index = self.build_index(self.ref)
    
    def store_reference(self, reference):
        '''Concatanate the sequences of all proteins
        
        Sequences are separated by a string of `k` gaps (-).
        That way there can be no seed spanning 2 proteins.
        It also creates a list of "ranges" spanned by each 
        protein in the combined reference. Thus protein[i]
        spans the region of the reference between 
        prot_ranges[i-1]:prot_ranges[i]
        '''
        self.proteins = list(reference.keys())
        self._prot_ranges = []
        self.ref = ''
        sep = self.score.gap_char * self.k  # protein separator
        for prot, seq in reference.items():
            self.ref += seq + sep
            self._prot_ranges.append((len(self.ref), prot))

    def build_index(self, reference):
        '''Returns an index of informative words (k-mers).
        The index is a dictionary that maps words to positions
        in the reference that are similar to it.
        
        Iterate over all k-mers and store the positions where
        they appear in the reference. Then iterate over all
        pairs of words and merge their "hits" if they are similar.
        '''
        hits = {}
        # get positions of words
        for i, word in enumerate(self.iter_words(reference)):
            if self.score(word, word) < self.T:
                # non-informative word, self-similarity == max(score)
                continue
            try:
                hits[word].append(i)
            except KeyError:
                hits[word] = [i]
        
        # merge hits of 2 words if they are similar
        index = hits.copy()
        words = list(index.keys())
        for i, w1 in enumerate(words[:-1]):
            for w2 in words[i+1:]:
                if self.score(w1, w2) >= self.T:
                    index[w1] = self.merge_sorted(index[w1], hits[w2])
                    index[w2] = self.merge_sorted(index[w2], hits[w1])
        return index
    
    def get_protein(self, pos):
        '''Given a position in the combined reference, 
        it returns the protein it belongs to.
        
        `self._prot_ranges` is sorted by construction so we can
        use bisect_left to find quickly the index of the protein. 
        '''
        ix = bisect_left(self._prot_ranges, (pos, ))
        return self._prot_ranges[ix][1]
    
    def search(self, query, Xdrop, Eval_cutoff):
        '''Searches for query in reference and returns all the position is maps.
        
        Args:
            - query: string to look for
            - Xdrop: heuristic to stop expanding the match
            - Eval_cutoff: ignore matches with greater E-value.
        
        Return:
            list with elements of the form (Eval, seed, offset, prot) where:
                + Eval = E-value
                + seed = (i, j) starting position in dot-matrix coordinates
                + offset = (bck, fwd) how far forward and backwards the alignment extends
                + prot = the protein the alignment belongs to
        '''
        hits = []
        scanned = self.diagRanges()  # to avoid double checking a hit
        for i, word in enumerate(self.iter_words(query)):
            for j in self.index.get(word, []):
                seed = (i, j)
                if seed in scanned: 
                    continue
                Eval, offset = self.extend_seed(seed, query, Xdrop)
                scanned.append(seed, offset)
                if Eval < Eval_cutoff:
                    prot = self.get_protein(j)
                    # heappush keeps `hits` "sorted"
                    heappush(hits, (Eval, seed, offset, prot))
        return [heappop(hits) for i in range(len(hits))]
    
    def extend_seed(self, seed, pattern, Xdrop):
        '''Given a starting position (seed) and a pattern it extend the
        alignment in both directions until the score drops below Xdrop.
        '''
        k = self.k
        i, j = seed  # i: query, j: subject
        # extend forward
        query   = self.iter_seq(pattern , i, 1)
        subject = self.iter_seq(self.ref, j, 1)
        score_f, off_f = self.extend_ungapped(query, subject, Xdrop)
        # extend in reverse
        query   = self.iter_seq(pattern , i+k, -1)
        subject = self.iter_seq(self.ref, j+k, -1)
        score_r, off_r = self.extend_ungapped(query, subject, Xdrop)
        # final score = fwd + rev - word (word was added twice)
        score = score_f + score_r - self.score(pattern[i:i+k], self.ref[j:j+k])
        l     =   off_f +   off_r - k  # length of extension
        m = len(pattern) 
        Eval = self.compute_Eval(score, m, l)
        return Eval, (k - off_r, off_f)

    def extend_ungapped(self, pattern, ref, Xdrop):
        '''Extends the pattern along the reference until the score drops 
        Xdrop below the maximum.
        '''
        score = 0  # running score
        max_score, imax = 0, 0
        for i, (p, r) in enumerate(zip(pattern, ref)):
            score += self.score[p, r]
            if score >= max_score:
                # '==' is included because new max is longer
                max_score, imax = score, i
            elif max_score - score > Xdrop:
                break
        return max_score, imax + 1
    
    def compute_Eval(self, score, m, l):
        '''Computes the E-val statistic'''
        K, lam = self.scaling_const
        S = (lam * score - np.log(K)) / np.log(2)
        n = len(self.ref) - l + 1
        m = m - l + 1
        E = m * n * 2**(-S)
        return E
        
    def iter_words(self, text):
        '''Iterate over all k-mers of a text'''
        N = len(text)
        k = self.k
        for i in range(N-k+1):
            yield text[i:i+k]
    
    def iter_seq(self, text, offset=0, step=1):
        '''Iterate over text forward or backward (by step) starting at offset'''
        if step > 0:
            text_range = range(offset, len(text), step)
        else:
            text_range = range(offset-1, -1, step)
        for i in text_range:
            yield text[i]
    
    def iter_seeds(self, query):
        '''Get all the potential seeds for a query'''
        for i, word in enumerate(self.iter_words(query)):
            for j in self.index.get(word, []):
                yield i, j
    
    @staticmethod
    def merge_sorted(X, Y):
        '''
        Given 2 value ascending lists X and Y create a new sorted list Z.
        Concatanating and sorting afterwards would be wasteful since X, Y have order we can exploit.
        Based on actual `merge_sort`
        '''
        Lx, Ly = len(X), len(Y)
        # check if there is something to merge
        if Lx == 0:
            return Y
        if Ly == 0:
            return X
        
        Z = [None] * (Lx + Ly)
        iz, ix, iy = 0, 0, 0
        while True:
            if X[ix] > Y[iy]:
                Z[iz] = Y[iy]
                iy += 1
            else:
                Z[iz] = X[ix]
                ix += 1
            iz += 1
            if ix == Lx: # X is exhausted
                Z[iz:] = Y[iy:]
                break
            elif iy == Ly: # Y is exhausted
                Z[iz:] = X[ix:]
                break
        return Z
    
    # inside BLAST
    class diagRanges:
        '''
        Data structure to keep explored diagonals.
        Seeds may be position close to each other and some extention may cover them.
        Changing the staring point of the HSP does not affect the outcome.
        Extention is bi-directional and only depends on max_score and Xdrop
        '''
        def __init__(self):
            self.ranges = {}
        
        def append(self, coord, offset):
            row, col = coord
            diag = col - row
            interval = range(row+offset[0], row+offset[1])
            try:
                self.ranges[diag].append(interval)
            except KeyError:
                self.ranges[diag] = [interval]
        
        def __contains__(self, coord):
            row, col = coord
            diag = diag = col - row
            for interval in self.ranges.get(diag, []):
                if row in interval or col in interval:
                    return True
            return False
        
def print_blast(aln, query, ref):
    Eval, seed, offset, prot = aln
    i, j = seed
    b, f = offset
    qseq = query[i-b:i+f]
    rseq = str(ref[j-b:j+f])
    mis  = ''.join('|' if r == q else ' ' for q, r in zip(qseq, rseq))
    head = f'Protein: {prot}  E-value: {Eval:.2E}'
    print('\n'.join([head, 
                     '-' * len(head), 
                     'Query: ' + qseq,
                     '       ' + mis,
                     'Sbjct: ' + rseq]))

Example

BLAST Analysis - Xdrop

Motivation:

  1. Efficiency: less time spend per seed
  2. Resolution: avoid linking “independent” alignments

Random Ungapped Alignments

E-value metric:

How many random local alignments of score \(S\) or higher do we expect to recover for a query of length \(m\)?

Model Parameters

Alignment are computed in 2 main steps:

  1. Seeding
  2. Extension

Random Alignment Generator

def random_sequencer(m, alphabet, p=None):
    if p is not None:
        p = np.array(p)
        assert np.all(p >= 0)
        p = p / sum(p)
    rng = np.random.default_rng()
    while True:
        yield ''.join(rng.choice(alphabet, m, replace=True, p=p))

amino_freq = get_amino_freq()  # dictionary {A: freq_A, ...}
aminos, amino_p = zip(*amino_freq.items())

sequence_generator = random_sequencer(10, aminos, amino_p)
for i in range(10):
    s = next(sequence_generator)
    print(s)
SDGAYTVDRV
VIRQTPLRRL
RKAEIELRVI
AQEGSYNCTT
FKLSIAVRNG
VRASPTRRRH
DRDGGMVEYN
LMFWKGFPRT
VATARSLSRQ
GKGMQEPSFD

Seeding

Bigger matrices result in more seeds:

\[E \sim mn\]

Alignment Score

The score of a random ungapped alignment equivalent to a negatively biased random walk.

Alignment Score

The probability of observing a score drops exponentially:

\[E \sim e^{-\lambda s}\]

Karlin-Altschul Theory

Building on these 2 intuition we can sketch a theory:

  • The expected number of alignments: \(E(S, m) = Kmne^{\lambda S}\)
  • Given just \(E\) the distribution with the fewest added assumptions is the Poisson
  • The probability of observing one or more alignment with a given score is:

\[P(N(S) \ge 1) = 1 - e^E\]

BLAST Score

BLAST uses a normalized version of the score to calculate the E-value:

\[ \begin{align} S' &= \frac{\lambda S - \log K}{\log 2}\\ E &= mn 2^{-S'} \end{align} \]

Gapped Alignment (Levenstein Distance)

The Problem

Dynamic Programming

From Wikipedia

Dynamic programming is a method for solving a complex problem by breaking it down into a collection of simpler subproblems, solving each of those subproblems just once, and storing their solutions. The next time the same subproblem occurs, instead of recomputing its solution, one simply looks up the previously computed solution, thereby saving computation time at the expense of a (hopefully) modest expenditure in storage space.

Alignment Problem Structure

We want to expand this approach now to address the optimal alignment problem. So we have to

  1. Recognize the “size” of the problem
  2. Imagine how we could construct the optimal solution from smaller ones.
  3. Apply the same scheme to the smaller problems until we reach a tractable problem.
  4. Store the solutions of the simpler problems and use them to solve more complex one in order.

Size of the problem

A natural metric of size is the length of the (finally) aligned sequences (bigger than either)

\[ \begin{aligned} \hat{X} &= \hat{x}_1\hat{x}_2 \dots \hat{x}_k & \hat{x} &\in \{x_1,\dots,x_m, \_\}\\ \hat{Y} &= \hat{y}_1\hat{y}_2 \dots \hat{y}_k & \hat{y} &\in \{y_1,\dots,y_n, \_\} \\ \end{aligned} \]

Inductive Step

Focus on the last elements \((\hat{x}_k, \hat{y}_k)\). There are 3 possibilities:

\[ \begin{aligned} \hat{x}_k &= x_m & \hat{y}_k &= y_n \\ \hat{x}_k &= x_m & \hat{y}_k &= \_ \\ \hat{x}_k &= \_ & \hat{y}_k &= y_n \end{aligned} \]

Each case correspond to a new smaller pair \((X', Y')\):

\[ \begin{aligned} X' &= x_1 \dots x_{m-1} & Y' &= y_1 \dots y_{n-1} \\ X' &= x_1 \dots x_{m-1} & Y' &= Y \\ X' &= X & Y' &= y_1 \dots y_{n-1} \end{aligned} \]

Inductive Hypothesis

If someone the solutions to \(S_{X',Y'}, S_{X',Y}, S_{X,Y'}\) were known then we can construct the final solution:

\[ S_{X,Y} = \max \begin{cases} S_{X',Y'} &+& G(x_m, y_n) \\ S_{X',Y} &+& G(x_m, \_) \\ S_{X,Y'} &+& G(\_,y_n) \end{cases} \]

Base Case

The optimal alignment of a string \(X\) with an empty string ’’ is:

\[ \begin{aligned} x_1 x_2 &\dots x_n \\ \mathtt{\_\_} &\dots \mathtt{\_\_} \end{aligned} \]

Path Graph

Cost of Edges

Induction

Needleman-Wunsch Algorithm

Finds global alignment

def NeedlemanWunsch(x, y, M):
    # Initialize Variables
    m, n = len(x) + 1, len(y) + 1
    S = np.zeros((m, n), dtype=int)  # cost matrix
    P = np.zeros((m, n), dtype=int)  # predecesor matrix
    
    # Initialize 1st column/row
    for i in range(1, m):
        S[i, 0] = S[i-1, 0] + M(x[i-1], '_')
    P[1:, 0] = 2
    
    for j in range(1, n):
        S[0, j] = S[0, j-1] + M('_', y[j-1])
    P[0, 1:] = 3
    
    # Main Recursion
    for i in range(1, m):
        for j in range(1, n):
            scores = (S[i-1, j-1] + M(x[i-1], y[j-1]), # match
                      S[i-1,  j ] + M(x[i-1],   '_' ), # insertion
                      S[ i , j-1] + M(  '_' , y[j-1])) # deletion
            
            imax = np.argmax(scores)
            S[i, j] = scores[imax]
            P[i, j] = imax + 1
            
    return S, P

Global Alignment Example

def SimpleCost(x, y):
    if x != y:
        return -1
    return 0

X, Y = "THISLINE", "ISALIGNED"
S, P = NeedlemanWunsch(X, Y, SimpleCost)

#--ISALIGNED
#THIS-LI-NE-

Semi-global Algorithm

Ignores gap at the beginning and end of alignment ~ grep

def NeedlemanWunsch(x, y, M, algorithm="global"):
    # Initialize Variables
    m, n = len(x) + 1, len(y) + 1
    S = np.zeros((m, n), dtype=int)  # cost matrix
    P = np.zeros((m, n), dtype=int)  # predecesor matrix
    
    # Initialize 1st column/row
    if algorithm == 'global':
        for i in range(1, m):
            S[i, 0] = S[i-1, 0] + M(x[i-1], '_')
        P[1:, 0] = 2
        for j in range(1, n):
            S[0, j] = S[0, j-1] + M('_', y[j-1])
        P[0, 1:] = 3
    elif algorithm == 'semi':
        pass
    else:
        raise ValueError(f'Unknown algorithm specified: {algorithm}')
    
    # Main Recursion
    for i in range(1, m):
        for j in range(1, n):
            scores = (S[i-1, j-1] + M(x[i-1], y[j-1]), # match
                      S[i-1,  j ] + M(x[i-1],   '_' ), # insertion
                      S[ i , j-1] + M(  '_' , y[j-1])) # deletion
            
            imax = np.argmax(scores)
            S[i, j] = scores[imax]
            P[i, j] = imax + 1
            
    return S, P

Example Global vs Semi-Global

Local Alignment

Smith-Waterman Algorithm

  1. Negative cell are set to 0
  2. Backtracking should start at the highest value and stop at 0.
def SmithWaterman(x, y, M):
    # Initialize Variables
    m, n = len(x) + 1, len(y) + 1
    S = np.zeros((m, n), dtype=int)  # cost matrix
    P = np.zeros((m, n), dtype=int)  # predecesor matrix
    
    # Main Recursion
    for i in range(1, m):
        for j in range(1, n):
            scores = (0,                               # restart
                      S[i-1, j-1] + M(x[i-1], y[j-1]), # match
                      S[i-1,  j ] + M(x[i-1],   '_' ), # insertion
                      S[ i , j-1] + M(  '_' , y[j-1])) # deletion
            imax = np.argmax(scores)
            S[i, j] = scores[imax]
            P[i, j] = imax
    
    return S, P

Example Global vs Local

HEAGAWGHE-E
    || || |
--P-AW-HEAE

AWGHE-E
|| || |
AW-HEAE

Generalized Gap Score

\[ G(\text{gap}) = C_{\text{open}} + C (l_{\text{gap}}) \]

Affine Gap

\[ G(\text{gap}) = C_{\text{open}} + C \cdot l_{\text{gap}}\]

Concave vs Affine Gap Penalty

SW with Affine Gap

def SmithWaterman(x, y, M, gapOpen, gapExtend):
    m, n = len(x) + 1, len(y) + 1
    S = np.zeros((m, n), dtype=int)  # score matrix
    D = np.zeros((m, n), dtype=int)  # deletion matrix
    I = np.zeros((m, n), dtype=int)  # insertion matrix
    P = np.zeros((m, n), dtype=int)  # predecesor matrix
    
    for i in range(1, m):
        for j in range(1, n):
            D[i, j] = max(S[i, j-1] - gapOpen,   # open a deletion gap
                          D[i, j-1] - gapExtend) # extend deletion gap
            I[i, j] = max(S[i-1, j] - gapOpen,   # open a insertion gap
                          D[i-1, j] - gapExtend) # extend insertion gap
            scores  = (0,
                       S[i-1, j-1] + M[x[i-1], y[j-1]],
                       I[i, j],
                       D[i, j])
            imax = np.argmax(scores)
            S[i, j] = scores[imax]
            P[i, j] = imax
    return S, P

Example Global vs Local

AWGHE-E
|| || |
AW-HEAE

AWGHEE
||   |
AWHEAE

PSI-BLAST

  1. Smith-Waterman algorithm to extend
  2. Seeds require 2 hits
  3. Iterated search

2D Xdrop

  1. Early Stopping
  2. Limits the search to a band of diagonals

Extreme Value Statistic

  • The Karlin-Altschul Theory no longer applies because we are optimizing not summing the alignment score.
  • We can swap the Poisson model with Gumbel distribution which is common for maximum values statistics